import pandas as pd
from application.core.base_extract_faust import BaseExtractFaust
from application.tasks.kafka_to_info_task.data_storage_task import DataStorageTask
from application.tasks.kafka_to_info_task.data_type_task import DataTypeTask
from application.tasks.kafka_to_info_task.deduplication_task import DeduplicationTask
from application.tasks.kafka_to_info_task.translate_time_task import TranslateTimeTask


class KafkaToInfoExtract(BaseExtractFaust):
    flow_name = "基金资讯入库数据流"
    pipeline_list = [
        DataTypeTask(),  # 数据类型转换
        TranslateTimeTask(),  # 时间转换
        DeduplicationTask(),  # 去重
        DataStorageTask()  # 入库
    ]

    async def extract(self, input_data):
        df = pd.DataFrame(input_data)
        return df

# 1、将国自然基金资讯的sql导入数据（nsfc_project_code_dict表是已经存在的表需要正确配置数据库）
# 2、使用test/国自然基金资讯数据表/convert_oss.py转换段落表和附件表的测试服oss路径为线上的oss;
# 3、使用test/nsfc_to_es/main_pandas.py 将数据库内的数据导入到es
# 4、更新nation_foundation的nsfc_info分支的代码
